package com.atguigu.flink.sql.window;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.*;

/**
 * Created by Smexy on 2023/9/16
 *
 *  计数窗口在sql中使用，需要用到processingTime。
 *      根据数据的processingTime来判断到达的顺序。
 *
 *      计数窗口： https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/tableapi/#group-windows
 *      不支持sql，只有TableAPI

 */
public class Demo1_CountWindow
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
                env.setParallelism(1);
        
                SingleOutputStreamOperator<WaterSensor> ds = env
                    .socketTextStream("hadoop102", 8888)
                    .map(new WaterSensorMapFunction())
                    ;
        
                Schema schema = Schema.newBuilder()
                                      .column("id", "STRING")
                                      .column("ts", "BIGINT")
                                      .column("vc", "INT")
                                      .columnByExpression("pt", "PROCTIME()")
                                      .build();
        
                
                Table table = tableEnv.fromDataStream(ds,schema);

                /*
                    size = 3
                    滚动
                        Tumble.over(窗口大小)
                        Tumble.on(处理时间)
                        Tumble.as("起别名")

                    窗口大小由 rowInterval 定义

                    GroupWindow: TableAPI中的计数窗口
                 */
        TumbleWithSizeOnTimeWithAlias w1 = Tumble
            .over(rowInterval(3l))
            .on($("pt"))
            .as("w");

        /*
            滑动
            Slide.every(滑动步长)

            TableAPI中的滑动窗口，和DataStreamAPI中的滑动窗口有区别：
                    DataStreamAPI中： 到达了slide，就触发运算
                    TableAPI： 到达了slide，slide < size ，第一次计算必须满足size才会触发运算
         */
        SlideWithSizeAndSlideOnTimeWithAlias w2 = Slide
            .over(rowInterval(3l))
            .every(rowInterval(2l))
            .on($("pt"))
            .as("w");


        //按照并行度的分类是在计算时定义
        table
            .window(w2)
            //全局窗口
                //.groupBy($("w"))
            //除了窗口外，还使用了其他字段分组，就是Keyed窗口，其他字段就是Key
                .groupBy($("w"),$("id"))
                .select($("id"),$("vc").sum().as("sumVc"))
                .execute()
                .print();


        env.execute();
    }

}
